/* Copyright (c) 2016 Lucky Byte, Inc.
 */
package com.lucky_byte.pay.p096;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.lucky_byte.pay.jar.Enums;
import com.lucky_byte.pay.jar.Jdbc;
import com.lucky_byte.pay.jar.JdbcRecord;
import com.lucky_byte.pay.jar.JdbcTable;


public class PoolListener implements Runnable
{
	private static final Logger logger = LogManager.getLogger();

	private JdbcRecord host;
	private List<PooledServer> pools;
	private boolean is_stoped = true;

	public PoolListener(JdbcRecord host) {
		this.host = host;
		this.pools = new ArrayList<>();
	}

	private String getModeName(JdbcRecord chnl) {
		switch (chnl.getInteger("mode")) {
		case 1:
			return "1-透传";
		case 2:
			return "2-重组";
		default:
			return chnl.getInteger("mode") + "-未知";
		}
	}

	/**
	 * 启动服务线程池
	 */
	private void startupPool(JdbcRecord ichnl) {
		ichnl.setField("mode_name", this.getModeName(ichnl));
		ichnl.setField("chnl", Enums.CHNL_096);
		ichnl.setField("name", "POS 009.6");
		ichnl.setField("indirect", ichnl.getInteger("mode") == 2);

		if (ichnl.getBoolean("disabled")) {
			logger.info("POS 009.6 服务模式[{}]已被禁用.",
					ichnl.getString("mode_name"));
			return;
		}
		try {
			PooledServer pool = new PooledServer(ichnl, this.host);
			pools.add(pool);
			new Thread(pool).start();
		} catch (Exception e) {
			logger.catching(e);
		}
	}

	private boolean startup() {
		JdbcTable table = JdbcTable.listBy1_NE("pay_i_096",
				"host = ?", host.getString("uuid"));
		if (table == null) {
			logger.error("查询 POS 009.6 接入服务配置错误，请检查.");
			return false;
		}
		if (table.getRecordCount() == 0) {
			logger.error("通过主机地址 {} 未能查询到 POS 009.6 接入渠道配置，请检查.",
					host.getString("ipaddr"));
			return false;
		}
		for (int i = 0; i < table.getRecordCount(); i++) {
			startupPool(table.getRecord(i));
		}
		is_stoped = false;
		return true;
	}

	public void stop() {
		if (!is_stoped) {
			for (PooledServer pool : pools) {
				pool.stop();
			}
			is_stoped = true;
		}
	}

	@Override
	public void run() {
		if (!startup()) {
			return;
		}
		while (!is_stoped) {
			try {
				this.listenDBNotification();
			} catch (InterruptedException e) {
				logger.info("线程被中断，不再监听数据表修改事件.");
				break;
			}
		}
	}

	/**
	 * 监听数据库通知事件
	 */
	private void listenDBNotification() throws InterruptedException {
		try {
			Connection conn = Jdbc.getConnection();
			PGConnection pgconn = (PGConnection)conn;
			Statement stmt = conn.createStatement();
			stmt.execute("listen pay_i_096");
			stmt.close();
			while (true) {
				PGNotification notifications[] = pgconn.getNotifications();
				if (notifications != null) {
					for (int i = 0; i < notifications.length; i++) {
						logger.info("POS 009.6 接入服务配置被修改，开始处理...");
						this.respNotification(notifications[i]);
					}
				}
				Thread.sleep(3000);
			}
		} catch (SQLException e) {
			logger.error("SQL error: {}[{}]", e.getMessage(),
					e.getClass().getSimpleName());
			return;
		}
	}

	/**
	 * 响应数据库发送的通知
	 */
	private void respNotification(PGNotification notification) {
		if (!notification.getName().equals("pay_i_096")) {
			logger.fatal("数据库通知 channel 名应该为 pay_i_096.");
			return;
		}
		String parameter = notification.getParameter();
		if (parameter == null) {
			logger.warn("数据库通知参数为空，请检查...");
			return;
		}
		Map<String, String> params;
		try {
			params = new Gson().fromJson(parameter,
				new TypeToken<HashMap<String, String>>() {}.getType());
			if (params == null) {
				logger.warn("数据库通知参数[{}]无效，请检查...", parameter);
				return;
			}
		} catch (Exception e) {
			logger.warn("数据库通知参数[{}]不是有效的 JSON 格式.", parameter);
			return;
		}
		if (params.get("host") == null || params.get("uuid") == null) {
			logger.warn("数据库通知参数[{}]缺少必须的字段.", parameter);
			return;
		}
		if (!params.get("host").equals(this.host.getString("uuid"))) {
			logger.debug("非本服务器配置修改，不做处理.");
			return;
		}
		String uuid = params.get("uuid");

		JdbcTable table = JdbcTable.listBy1_NE("pay_i_096", "uuid = ?", uuid);
		if (table == null || table.getRecordCount() == 0) {
			logger.error("未找到 UUID 为[{}]的 POS 009.6 配置.", uuid);
			return;
		}
		JdbcRecord ichnl = table.getRecord(0);
		int new_addr = ichnl.getInteger("listen_addr");
		int new_port = ichnl.getInteger("listen_port");

		// 通过 UUID 逐个匹配正在运行的服务
		for (PooledServer pool : pools) {
			JdbcRecord pool_ichnl = pool.getIChnl();
			if (pool_ichnl.getString("uuid").equals(uuid)) {
				// 如果服务被禁用了，则停止当前的服务
				if (ichnl.getBoolean("disabled")) {
					logger.info("POS 009.6 服务模式[{}]被禁用，停止服务...",
							pool_ichnl.getString("mode_name"));
					pool.stop();
					pools.remove(pool);
				}
				// 如果地址或端口改变了，则需要重启服务
				else if (pool_ichnl.getInteger("listen_addr") != new_addr ||
						 pool_ichnl.getInteger("listen_port") != new_port) {
					logger.info("POS 009.6 服务模式[{}]监听地址[{}:{}]被修改为"
							+ "[{}:{}]，重启服务...",
							pool_ichnl.getString("mode_name"),
							pool_ichnl.getInteger("listen_addr"),
							pool_ichnl.getInteger("listen_port"),
							ichnl.getInteger("listen_addr"),
							ichnl.getInteger("listen_port"));
					pool.stop();
					pools.remove(pool);
					startupPool(ichnl);
				} else {
					logger.info("配置变更不影响服务正常运行，无需处理.");
				}
				return;
			}
		}
		// 如果没有匹配的正在运行的服务，则可能需要启动新服务
		if (!ichnl.getBoolean("disabled")) {
			logger.info("启动 POS 009.6 接入服务，模式[{}]，服务端口[{}]...",
					ichnl.getString("mode_name"),
					ichnl.getInteger("listen_port"));
			startupPool(ichnl);
		}
	}

}
